最佳实践:数据开发工作流中配置数据推送节点

DataWorks数据开发支持将数据推送作为节点,结合数据开发已有的工作流,提供了简单推送、合并推送、脚本推送、条件推送以及MaxCompute数据推送方式,本实践将为您带来这五种推送方式的实践。

背景信息

在DataWorks业务流程开发中,可以新增数据推送节点,该节点可根据业务流程处理完成数据后,通过简单的查询来获取所需要的数据,根据任务调度及时、快速地推送数据至钉钉群、飞书群、企业微信群以及Teams中。

实践思路

  1. 新增一个节点,用来准备测试数据,以便为后续不同推送流程提供数据支持。

  2. 新增数据查询节点、赋值节点以及其他数据处理节点,实现对测试数据处理与查询。

    说明

    准备测试数据节点与数据处理查询节点,在实际业务中可以根据实际情况进行调整,本案例以MySQL节点为例进行数据的推送。

  3. 新增数据推送节点,接收数据查询节点的输出参数,将上下文参数中的输入参数携带的数据推送至钉钉群、飞书群、企业微信群以及Teams中。

实践方式介绍

本实践共有五种数据推送节点与工作流的结合方式,分别为简单推送合并推送脚本推送条件推送以及MaxCompute数据推送方式。

  • 简单推送:上游工作流完成SQL查询后,通过节点上下文参数将查询结果输出至数据推送节点进行推送。

  • 合并推送:上游有多个工作流,分别完成SQL查询后,通过节点上下文参数将查询结果输出至一个数据推送节点进行推送。

  • 脚本推送:上游节点为赋值节点,通过赋值节点代码对数据进行处理并通过节点上下文参数将数据输出至推送节点进行推送。赋值节点详情,请参见赋值节点

  • 条件推送:上游流程通过分支节点对数据按照配置的条件进行判断,查询输出符合判断条件的数据,通过节点上下文参数将查询结果输出至数据推送节点进行推送。分支节点详情,请参见分支节点

  • MaxCompute数据推送:推送MaxCompute数据源上的数据,可通过赋值节点对MaxCompute进行查询后进行推送。

前提条件

在开启本实践前,您需要首先准备好以下内容:

  • 请确保已开通DataWorks服务,详情请参见开通DataWorks服务

  • 请确保已创建DataWorks空间,详情请参见创建工作空间

  • 请确保已通过RDS创建MySQL实例,并已在空间下创建MySQL数据源。详情请参见相关文档创建并管理数据源

  • 请确保在工作空间下已绑定MaxCompute数据源。详情请参见创建MaxCompute数据源

    说明

    本文以MySQL数据源与MaxCompute数据源为例,您可以根据自身需求新增不同类型的数据源。

使用限制

  • 数据推送功能推送至不同对象时的数据大小限制:

    • 推送目标为钉钉,推送数据大小不超过20KB

    • 推送目标为飞书,推送数据大小不超过30KB,图片小于10MB

    • 推送目标为企业微信,每个机器人发送的消息不能超过20条/分钟

    • 推送目标为Teams,推送大小不大于28KB

    说明

    钉钉移动端和企业微信暂不支持Markdown中定义的表格的渲染,建议在推送内容中使用表格组件来展示数据内容。飞书与Teams移动版支持正常展示Markdown中定义的表格。

  • 仅以下地域的DataWorks工作空间可使用数据推送功能:华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华南1(深圳)、西南1(成都)、中国香港、新加坡、马来西亚(吉隆坡)、美国(硅谷)、美国(弗吉尼亚)、德国(法兰克福)

准备流程

新建业务流程

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 数据开发面板,右键单击业务流程,选择新建业务流程。并根据业务需要定义业务流程名称,本案例业务流程命名为数据推送Demo示例

创建流程节点

创建完成数据推送Demo流程业务流程后,双击流程名打开业务流程图页面,根据下表里您需要的推送方式,单击image 创建对应的节点。为方便完成该实践中的推送方式,请以表格内的节点名对创建的节点进行命名。

推送方式

节点名

节点类型

节点说明

条件推送

查询上月销售总额

MySQL节点

查询测试数据中上月销售总额,将查询结果以上下文参数的形式输出给分支节点。

达标条件判断

分支节点

接收MySQL节点传递的输出参数,按条件进行分支判断,将符合条件与不符合条件的数据以上下文参数的形式输出给不同MySQL节点。

上月销售达标数据

MySQL节点

接收分支节点判断达标的输出参数,查询出达标数据,以上下文参数的形式输出给数据推送节点。

上月销售不达标数据

接收分支节点判断不达标的输出参数,查询出不达标数据,以上下文参数的形式输出给数据推送节点。

推送销售达标前三名类别

数据推送

接收查询的达标输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

推送销售不达标倒数三名类别

接收查询的不达标输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

脚本推送

查询上一周销售数据

MySQL节点

查询测试数据中上周销售前三类总额,将查询结果以上下文参数的形式输出给赋值节点。

组织销售前三名列表

赋值节点

示例赋值语言为Python,接收MySQL节点的输出参数,并将其列表化后,再将结果以上下文参数的形式输出给数据推送节点。

推送上周前三名类别

数据推送

接收赋值节点列表的输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

合并推送

查询昨日销售总额

MySQL节点

  • 查询测试数据中昨日销售总额与昨日销售增长额,将查询结果以上下文参数的形式输出给数据推送节点。

  • 查询昨日销售总额节点可与简单推送中的昨日销售总额共用同一节点。

查询昨日销售增长额

推送昨日销售总额与增长额

数据推送

同时接收两个MySQL节点传递的输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

简单推送

查询昨日销售总额

MySQL节点

查询测试数据中的昨日销售总额,将查询结果以上下文参数的形式输出给数据推送节点。

推送昨日销售总额

数据推送

接收MySQL节点传递的输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

MaxCompute数据推送

MySQL同步数据至ODPS

离线同步

将MySQL内准备的数据,通过数据集成的方式同步至ODPS数据源之中。

ODPS数据查询

赋值节点

对ODPS数据源中的数据进行查询,再将查询结果以上下文参数的形式输出给数据推送节点。

ODPS数据推送

数据推送

接收赋值节点ODPS数据查询的输出参数,将上下文参数中的输入参数携带的数据推送至推送目标。

准备数据

在日常业务中存在多种多样的数据场景,本实践以简单的订单表为案例演示数据推送节点的使用,以下为创建并写入测试数据的步骤,若您无需测试数据,也可跳过该步骤。

创建测试表

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 在左侧导航栏,单击image进入临时查询页面后,鼠标悬浮在image按钮,新建 > MySQL新增临时查询。

    • 节点类型:MySQL。

    • 路径:临时查询。

    • 名称:创建测试数据表。

  3. 创建测试表orders,表格SQL如下:

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- 类别
     sales DOUBLE NOT NULL, -- 订单销售额
     datetime DATETIME NOT NULL, -- 订单发生时间
     PRIMARY KEY (order_id),
     INDEX (category)
);

创建存储过程

以下存储过程将随机生成过去两个月的订单表数据。

说明

存储过程需要在MySQL客户端中执行创建。

DELIMITER $$

CREATE PROCEDURE InsertOrders(IN num_orders INT)
BEGIN
  DECLARE v_category VARCHAR(100);
  DECLARE v_sales DOUBLE;
  DECLARE v_datetime DATETIME;
  DECLARE v_category_list VARCHAR(255);
  DECLARE v_index INT;
  DECLARE i INT DEFAULT 0;
  
  -- 定义类别列表的字符串,以逗号分隔
  SET v_category_list = 'Electronics,Books,Home & Kitchen,Fashion,Toys,Baby,Computers,Electronics,Games,Garden,Clothing,Grocery,Health,Jewelry,Kids';
  -- 获得类别的总数
  SET v_index = ROUND((RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
  
  WHILE i < num_orders DO
    -- 生成随机索引以选择类别
    SET v_index = FLOOR(1 + (RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
    -- 从类别列表中提取随机类别
    SET v_category = SUBSTRING_INDEX(SUBSTRING_INDEX(v_category_list, ',', v_index), ',', -1);
    
    -- 生成1000至30000之间的随机销售额
    SET v_sales = 1000 + FLOOR(RAND() * 29000);
    
    -- 在过去两个月内生成随机日期时间
    SET v_datetime = NOW() - INTERVAL FLOOR(RAND() * 61) DAY;
    
    -- 将新的随机订单插入到订单表中
    INSERT INTO orders (category, sales, datetime) VALUES (v_category, v_sales, v_datetime);
    
    SET i = i + 1;
  END WHILE;
END$$

DELIMITER ;

写入测试数据

存储过程创建成功后,可通过CALL语句调用存储过程,向orders表内存储随机生成的数据。

-- 调用存储过程以插入特定数量的随机订单
CALL InsertOrders(1000); -- 这将插入1000条随机订单

配置推送流程

在DataWorks推送流程根据不同的推送逻辑可以进行条件推送脚本推送合并推送简单推送以及MaxCompute数据推送,本实践以MySQL为例,演示五种其他数据源推送方式以及MaxCompute数据源数据推送示例。

条件推送

步骤一:搭建推送流程

双击打开数据推送Demo流程业务流程图页面,依次将查询上月销售总额通过达标条件判断分成达标上月销售达标数据推送销售达标前三名类别和不达标上月销售不达标数据推送销售不达标倒数三名类别,关联起来,生成一条如下图所示的条件推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,通过节点上下文参数生成SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至分支节点。

  1. 双击打开查询上月销售总额节点,编写以下查询SQL代码。

    -- 统计上个月销售额
    SELECT SUM(sales) AS sales_amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 本节点输出参数:在节点上下文参数下单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

步骤三:配置分支节点

在分支节点内,通过节点上下文参数本节点输入参数获取上游SQL查询节点的输出参数outputs,然后在分支节点进行条件判断后通过本节点输出参数输出符合条件的数据。

  1. 双击打开达标条件判断节点,单击添加分支,新增两个分支,分别为达标和不达标,详细配置信息如下

    配置项

    达标分支

    不达标分支

    分支条件

    ${inputs[0][0]}>=500000

    ${inputs[0][0]}<500000

    关联到节点输出

    达标。

    不达标。

    分支描述

    上月销售总额达标。

    上月销售总额不达标。

    说明

    [0][0]为二维数组,用来定位获取到的参数中用于条件判断的数据。

    • 上游为SQL查询节点情况下,使用二维数组,定位参数中需要作为条件的数据。

    • 上游为Python节点情况下,使用一维数组,定位参数中需要作为条件的数据。

  2. 双击打开达标条件判断节点,单击右侧调度配置,在弹出的调度配置中配置如下内容。

    参数类型

    参数内容与说明

    时间属性

    调度周期

    定时调度时间

    08:00

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    本节点输出名称

    当配置分支后,输出名会自动解析并添加到此处。输出名与分支定义的关联到节点输出一致。

    节点上下文参数

    本节点输入参数

    参数名inputs

    取值来源:选择上游节点查询上月销售总额的输出参数outputs

    本节点输出参数

    系统默认添加为outputs

  3. 完成配置后,单击image保存达标条件判断节点。

步骤四:配置分支SQL查询节点

分支SQL查询节点在本实践中分为上月销售达标数据上月销售不达标数据两个节点,需要将分支节点上下文参数输出至上月销售达标数据节点和上月销售不达标数据节点,然后将分支节点SQL查询结果用节点上下文参数本节点输出参数分别输出至对应的达标不达标数据推送节点。

  1. 分别双击上月销售达标数据节点和上月销售不达标数据节点,进入编辑页面,编写以下SQL代码。

    上月销售达标数据

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    -- 创建临时表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    --查询并写入临时表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount DESC limit 3;
    --查询达标前三名数据
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;

    上月销售不达标数据

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    --创建临时表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    
    --查询并写入临时表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount ASC limit 3;
    
    --查询不达标的后三名数据
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖的上游节点:添加上游的分支节点,因在步骤一:搭建推送流程已创建好流程,需查看上游依赖是否正确。

      • 上月销售达标数据节点:上游节点输出名为达标

      • 上月销售不达标数据节点:上游节点输出名为不达标

    • 本节点输出参数:在节点上下文参数下单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存上月销售达标数据节点和上月销售不达标数据节点。

步骤五:配置分支数据推送节点

需要创建两个数据推送节点,分别配置节点上下文参数本节点输入参数,来获取上月销售达标数据上月销售不达标数据的分支SQL查询节点输出的outputs参数,在正文中使用这些参数并将其推送至各自的目标。

  1. 双击推送销售达标前三名类别节点和推送销售不达标倒数三名类别节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    本实践以8:00为例,确保数据能在08:00时推送至目标渠道。您可以根据实际需要配置其他时间点。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    • 参数名:inputs

    • 取值来源:

      • 推送销售达标前三名类别节点选择上月销售达标数据节点的输出参数outputs

      • 推送销售不达标倒数三名类别节点选择上月销售不达标数据节点的输出参数outputs

    • 推送销售达标前三名类别节点image

    • 推送销售不达标倒数三名类别节点image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉、飞书、企业微信与Teams。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉、飞书、企业微信机器人或Teams的Webhook,需要在相应的目标平台上获取。

      说明
    • 标题推送销售达标前三名类别推送销售不达标倒数三名类别

    • 正文:按需求进行配置,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符,以获取上游输入的参数。

      • 推送销售达标前三名类别示例image

      • 推送销售不达标倒数三名类别示例image

  3. 配置完成后,单击image保存推送销售达标前三名类别节点和推送销售不达标倒数三名类别节点。

步骤六:测试条件推送流程

在完成条件推送流程配置后,需要对条件测试流程进行测试,以便后续提交发布。

  1. 双击打开数据推送Demo流程业务流程页面。

  2. 选中查询上月销售总额节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

脚本推送

步骤一:搭建推送流程

双击打开数据推送Demo流程业务流程图页面,在业务流程图页面,可以看到已创建的多个节点,依次将查询上一周销售数据组织销售前三名列表推送上周前三名类别进行关联,生成一条如下图所示的脚本推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数生成SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至赋值节点。

  1. 双击查询上一周销售数据节点,进入编辑页面,编写以下SQL代码。

    -- 统计上一周销售额
    SELECT category, SUM(sales) AS amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category ORDER BY amount DESC limit 3;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 本节点输出参数:在节点上下文参数下单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询上一周销售数据节点。

步骤三:配置赋值节点

赋值节点通过节点上下文参数下的本节点输入参数可以获取到上游SQL节点的输出参数outputs,将SQL节点的输出参数进行重新赋值调整,生成新的本节点输出参数,将新数据输出至数据推送节点。

  1. 双击组织销售前三名列表的赋值节点,进入节点编辑页面,编写以下Python代码。

    def main():
    
        From datetime import date
        today = date.today()
        formatted_date = today.strftime('%Y-%m-%d')
        
        msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \
        '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \
        '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \
        '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n '
        
        print(msg)
    
    
    if __name__ == "__main__":
        import sys
        main()
  2. 编辑完成赋值代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的调度资源组。

    • 节点上下文参数

      • 本节点输入参数

        • 参数名inputs

        • 取值来源:上游查询上一周销售数据节点的输出参数outputs。

      • 本节点输出参数:由系统默认生成。

  3. 单击image保存组织销售前三名列表赋值节点。

步骤四:配置数据推送节点

通过配置节点上下文参数本节点输入参数,来获取上游赋值节点输出的outputs参数,在正文中使用这些参数并将其推送至目标。

  1. 双击名为推送上周前三名类别的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    本实践以8:00为例,确保数据能在08:00时推送至目标渠道。您可以根据实际需要配置其他时间点。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数名:inputs

    取值来源:选择上游节点上周销售前三类的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉、飞书、企业微信与Teams。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉、飞书、企业微信机器人或Teams的Webhook,需要在相应的目标平台上获取。

      说明
    • 标题推送上周前三名类别

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符,以获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送上周前三名类别推送节点。

步骤五:测试脚本推送流程

在完成脚本推送流程配置后,需要对脚本测试流程进行测试,以便后续提交发布。

  1. 双击数据推送Demo流程打开业务流程图页面。

  2. 选中查询上一周销售数据节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

简单推送

步骤一:搭建推送流程

双击数据推送Demo流程后,在业务流程图页面,可以看到已创建的多个节点,需拖动MySQL查询节点查询昨日销售总额作为上游节点,并将其与下游的数据推送节点推送昨日销售总额关联起来,生成一条如下图所示的简单推送的工作流。

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数生成SQL查询节点的输出参数outputs,从而实现将SQL的查询结果传递至数据推送节点。

  1. 双击查询昨日销售总额节点,编写以下查询SQL代码。

    -- 创建临时表temp_array
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      total_amount DOUBLE
    );
    
    -- 将查询到的昨日销售总额写入临时表temp_array中
    INSERT INTO temp_array (total_amount) 
    SELECT SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
    
    -- 查询临时表temp_array
    select total_amount FROM temp_array;
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 本节点输出参数:在节点上下文参数下单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询昨日销售总额节点。

步骤三:配置数据推送节点

通过配置节点上下文参数本节点输入参数,来获取上游SQL查询节点输出的outputs参数,在正文中使用这些参数并将其推送至目标。

  1. 双击名为推送昨日销售总额的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    本实践以8:00为例,确保数据能在08:00时推送至目标渠道。您可以根据实际需要配置其他时间点。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数名 : inputs

    取值来源:选择上游节点查询昨日销售总额的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉、飞书、企业微信与Teams。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉、飞书、企业微信机器人或Teams的Webhook,需要在相应的目标平台上获取。

      说明
    • 标题推送昨日销售总额

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符,以获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送昨日销售总额推送节点。

步骤四:测试简单推送流程

在完成简单推送流程配置后,需要对简单测试流程进行测试,以便后续提交发布。

  1. 双击打开数据推送Demo流程业务流程图页面。

  2. 选中查询昨日销售总额节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

合并推送

步骤一:搭建推送流程

双击打开数据推送Demo流程业务流程图页面,在业务流程图页面,可以看到已创建的多个节点,拖拽上游节点查询昨日销售总额查询昨日销售增长额与下游节点推送昨日销售总额与增长额关联起来,生成一条如下图所示的合并推送的工作流。

说明

查询昨日销售总额节点可与简单推送中的昨日销售总额共用同一节点

image

步骤二:配置SQL查询节点

通过配置SQL查询节点对测试数据进行查询,可以通过节点上下文参数生成SQL查询节点的输出参数outputs,将SQL的查询结果传递至数据推送节点。

  1. 单击查询昨日销售增长额节点,编写以下查询SQL代码。

    -- 创建统计前日数据表temp_array1
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 写入前日数据至temp_array1
    INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 创建统计昨日数据表temp_array2
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 写入昨日数据至temp_array2
    INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 创建统计昨日销售增长额result
    CREATE TEMPORARY TABLE IF NOT EXISTS result (
      category VARCHAR(255),
      diff DOUBLE
    );
    -- 写入增长额数据至result
    INSERT INTO result (category, diff) SELECT temp_array2.category AS category, temp_array2.sales - temp_array1.sales AS diff FROM temp_array1 LEFT JOIN temp_array2 ON temp_array1.category = temp_array2.category;
    
    -- 查询增长额临时表result
    SELECT category, diff FROM result;

  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖上游节点:勾选使用工作空间根节点

    • 本节点输出参数:在节点上下文参数下单击本节点输出参数后的添加赋值参数,添加输出参数作为下游节点输入参数的取值。image

  3. 单击image保存查询昨日销售增长额节点。

步骤三:配置数据推送节点

通过节点上下文参数本节点输入参数接收上游查询昨日销售总额查询昨日销售增长额的输出参数outputs,从而可以在正文中使用参数并将其推送至目标。

  1. 双击名为推送昨日销售总额与增长额的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    本实践以8:00为例,确保数据能在08:00时推送至目标渠道。您可以根据实际需要配置其他时间点。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择已创建好的调度资源组。

    说明

    初次使用数据推送节点,需先提工单升级调度资源组。

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    参数一

    • 参数名inputs1

    • 取值来源:选择上游节点查询昨日销售总额的输出参数outputs

    参数二

    • 参数名inputs2

    • 取值来源:选择上游节点查询昨日销售增长额的输出参数outputs

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉、飞书、企业微信与Teams。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉、飞书、企业微信机器人或Teams的Webhook,需要在相应的目标平台上获取。

      说明
    • 标题推送昨日销售总额与增长额

    • 正文:按需求进行配置正文即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游SQL查询节点查询的字段名作为占位符,以获取上游输入的参数。

      image

  3. 配置完成后,单击image保存推送昨日销售总额与增长额推送节点。

步骤四:测试合并推送流程

在完成简单推送流程配置后,才可对合并推送测试流程进行测试,以方便后续提交发布。

  1. 双击打开数据推送Demo流程业务流程图页面。

  2. 选中推送昨日销售总额与增长额节点,右键选择运行到该节点,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

MaxCompute数据推送

步骤一:搭建推送流程

双击数据推送Demo流程后,在业务流程图页面,可以看到已创建的多个节点,需拖动离线同步节点作为赋值节点的上游。将MySQL同步数据至ODPSODPS数据查询以及ODPS数据推送三个节点依次连接起来。image

步骤二:配置离线同步节点

通过配置MySQL同步数据至ODPS节点,实现将准备数据阶段在MySQL中写入的测试数据同步至ODPS数据源,以供后续使用。

  1. 双击MySQL同步数据至ODPS节点进入离线同步节点的配置。

    配置项

    配置内容

    图示

    数据来源侧

    数据来源

    MySQL

    image

    数据源名称

    选择您创建的MySQL数据源。

    我的资源组

    选择Serverless 资源组。

    数据去向侧

    数据去向

    MaxCompute(ODPS)

    数据源名称

    选择该空间绑定的MaxCompute数据源。

    配置完成后,系统会自动进行数据源与资源组之间的连动性测试,待连通性展示为成功后即可单击下一步,进行数据来源与去向的详细配置。

  2. 配置数据来源与去向。

    配置项

    配置内容

    图示

    数据来源

    数据源

    保持默认:

    • MySQL。

    • 选择您创建的MySQL数据源。

    image

    选择orders表。

    数据过滤

    可按需求进行配置,本实践中置空即可。

    切分键

    您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键。

    数据预览

    预览从MySQL数据源中获取到的数据,可用于判断是否符合预期。

    数据去向

    数据源

    保持默认:

    • MaxCompute(ODPS)。

    • 选择该空间绑定的MaxCompute数据源。

    image

    Tunnel资源组

    即 Tunnel Quota,默认选择“公共传输资源”,即MC的免费quota。

    说明

    MaxCompute的数据传输资源选择,具体请参见购买与使用独享数据传输服务资源组。【注意】如果独享tunnel quota因欠费或到期不可用,任务在运行中将会自动切换为“公共传输资源”。

    schema

    下拉选择需要写入的schema。

    单击一键生成目标表结构,生成目标表。

    分区信息

    如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate}

    写入模式

    入前清理已有数据(Insert Overwrite

  3. 配置好来源与去向后,将字段进行同名映射即可。image

  4. 通道配置。

    • 任务期望最大并发数:由于资源原因或者任务本身特性等原因,实际执行时并发数可能小于等于此值,收费按照实际执行的并发数收费,本实践选择2个并发即可。

    • 同步速率:通过限流可以保护数据来源端或者数据去向端的读写压力,不限流的情况下则会提供现有硬件环境下最大的传输性能,本实践不进行限流。

    • 脏数据策略:不容忍脏数据。

    • 分布式处理能力:默认为关闭状态,需要并发数大于等于8才能开启分布式。

    image

  5. 调度配置。

    完成集成任务配置后,单击右侧调度配置,在弹出的调度面板中配置如下内容。

    • 调度参数

      • 参数名bizdate

      • 参数值$[yyyymmdd-1]

    • 定时调度时间08:00

    • 重跑属性:下拉选择运行成功或失败后皆可重跑。

    • 调度资源组:选择已创建好的调度资源组。

    • 调度依赖:勾选使用工作空间根节点,作为离线同步任务的上游节点。

  6. 单击image保存MySQL同步数据至ODPS离线同步节点。

步骤三:配置赋值节点

ODPS数据源不支持使用ODPS SQL节点进行查询并通过节点上下文参数输出数据至数据推送节点。需采用赋值节点对ODPS数据进行查询处理,再通过节点上下文参数进行输出至数据推送节点。

  1. 双击ODPS数据查询节点,进入赋值节点。

    1. 在赋值节点编辑页面上方的请选择赋值语言的下拉框里选择ODPS SQL。

    2. 写入ODPS SQL。

      -- 首先,我们创建一个带有排名的子查询,用于计算每个分区(pt)内按销售额(sales)降序排列的顺序。
      -- DENSE_RANK() 函数会为每个分区内的行提供一个唯一的排名,相等的销售额会得到相同的排名,但不会跳过任何一个排名数字。
      --
      -- 子查询部分:
      -- 1. 从'orders'表中选择需要的列:order_id(订单ID)、category(类别)、sales(销售额)、datetime(时间)和pt(业务日期)。
      -- 2. 使用DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC):
         -- PARTITION BY pt 指定按照pt字段进行分区,即对每个不同的pt值分别进行排名。
         -- ORDER BY sales DESC 表示在每个分区内按照sales字段的降序来排序,即销售额最高的排在前面。
         -- rank 列将会存储每行在各自分区内的销售排名。
      --
      -- 主查询部分:
      -- 从上述带有排名的子查询中筛选出排名在前三名(rank <= 3)的记录。
      -- 这意味着对于每个业务日期,我们只选取销售额最高的前三个订单的信息。
      
      SELECT
        order_id,          -- 选择订单ID
        category,          -- 选择商品类别
        sales,             -- 选择销售额
        datetime,          -- 选择订单时间
        pt                  -- 选择业务日期
      FROM (
          SELECT
            order_id,
            category,
            sales,
            datetime,
            pt,
            DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC) AS rank  -- 计算每个pt分区内的销售额排名
          FROM orders
          WHERE pt = '${bizdate}'  -- 筛选业务日期为${bizdate}的记录
      ) AS ranked_orders
      WHERE rank <= 3  -- 只保留每个pt分区销售额排名前三的记录
  2. 编辑完成SQL代码后,单击右侧调度配置,在弹出的调度配置面板中配置如下内容。

    • 定时调度时间08:00

    • 调度资源组:选择已创建好的Serverless资源组。

    • 依赖上游节点:检查依赖的上游是否为MySQL同步数据至ODPS的离线同步节点。

    • 节点上下文参数:单击本节点输出参数后的添加,添加输出参数作为下游节点输入参数的取值。

      image

  3. 单击image保存ODPS数据查询节点。

步骤四:配置数据推送节点

通过配置节点上下文参数本节点输入参数,来获取上游赋值节点输出的outputs参数,在正文中使用这些参数并将其推送至目标。

  1. 双击名为ODPS数据推送的数据推送节点,进入节点后,单击调度配置,在调度配置面板中进行如下配置。

    参数类型

    参数内容

    图示

    调度参数

    参数名

    curdate

    image

    参数值

    $[yyyymmddhh:mi:ss]

    时间属性

    调度周期

    image

    定时调度时间

    08:00

    说明

    本实践以8:00为例,确保数据能在08:00时推送至目标渠道。您可以根据实际需要配置其他时间点。

    重跑属性

    运行成功或失败后皆可重跑。

    资源属性

    调度资源组

    选择数据推送节点功能上线日期后创建的资源组,若资源组为发布日期前,需提工单升级调度资源组。

    说明

    数据推送节点发布日期为2024年6月28日,DataWorks更多发布记录可参见:功能发布记录

    image

    节点上下文参数

    本节点输入参数

    单击添加新增本节点输入参数:

    • 参数名inputs

    • 取值来源:选择上游节点ODPS数据查询的输出参数outputs。

    image

  2. 调度配置完成后,即可配置数据推送内容,详情如下。

    • 数据推送目标:下拉数据推送目标选择所需的数据推送目标,若不存在,可单击下拉框右下角的创建数据推送目标,新建推送目标。

      参数

      说明

      类型

      支持圈选钉钉、飞书、企业微信与Teams。

      对象名称

      可按业务需求进行自定义。

      WebHook

      钉钉、飞书、企业微信机器人或Teams的Webhook,需要在相应的目标平台上获取。

      说明
    • 标题:ODPS数据推送

    • 正文:按需求进行配置即可,详情可参见配置推送内容

      说明

      正文中可直接使用上游赋值节点查询的字段名作为占位符,以获取上游输入的参数。

  3. 配置完成后,单击image保存ODPS数据推送节点。

步骤五:测试MaxCompute数据推送流程

在完成数据推送流程节点配置后,需要对数据推送流程进行测试,以便后续提交发布。

  1. 双击打开数据推送Demo流程业务流程页面。

  2. 选中MySQL同步数据至ODPS节点,右键选择运行节点及下游,等待运行完成即可。

    说明

    如果出现任务运行失败,选中需要查看的节点,单击右键选择查看日志即可查看日志。

    image

提交与发布

在完成数据推送流程配置后,双击数据推送Demo示例业务流程图页面,测试所有的数据推送流程是否能正常运行,测试成功后,需要提交发布。

  1. 在数据推送流程的编辑页面,单击image,运行业务流程。

  2. 待数据推送流程中的所有节点后出现image,单击image提交运行成功的数据推送流程。

  3. 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警

  4. 单击提交

  5. 提交成功后,即可在发布页面发布流程节点,详情可参见发布任务

后续步骤

数据推送流程将会随着配置的调度周期而运行,我们在运维页面可以对已发布的数据推送的各个任务节点进行各种的运维操作,详情请参见周期任务基本运维操作